# 线程池

  • 线程池在系统启动时即创建大量空闲的线程,程序将一个 Runnable 对象或 Callable 对象传给线程池,线程池就会启动一个线程来执行它们的 run()call() 方法
  • run()call() 方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个 Runnable 对象或 Callable 对象的 run()call() 方法
  • 线程池的组成部分
    1. 线程管理池(ThreadPool):用于创建并管理线程池,包括创建,销毁,添加新任务
    2. 工作线程(PoolWorker):线程池中的线程在没有任务的时候处于等待状态,可以循环的执行任务
    3. 任务接口(Task):每个任务必须实现接口,用来提供工作线程调度任务的执行,规定了任务的入口以及执行结束的收尾工作和任务的执行状态等
    4. 任务队列(TaskQueue):用于存放没有处理的任务,提供一种缓存机制
  • 线程池的使用:1. 创建线程池;2. 向线程池提交任务;3. 关闭线程池
  • 性质不同的任务可以用不同规模的线程池分开处理
    • 对于 CPU 密集型任务,应配置尽可能小的线程(减少上下文切换),如 N~cpu~+1
    • 对于 IO 密集型任务,由于线程并不是一直在执行任务,则应配置尽可能多的线程,应根据具体的 IO 阻塞时长进行考量决定(在 I/O 等待期间,OS 可以通过为另外的线程执行更多代码来更好地利用 CPU 资源),如 2*N~cpu~
    • 对于混合型的任务:如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量;如果这两个任务执行时间相差太大,则没必要进行分解
  • 线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
    线程池大小与处理器的利用率之比的计算公式

线程池相关类

  • 复用线程池不代表应用程序始终使用同一个线程池,应该根据任务的性质来选用不同的线程池;考虑按需使用隔离的线程池,以减少任务间的相互干扰

# Executors 工具类

  • 创建线程池的静态方法
    • ExecutorService newCachedThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程(无界线程池,60s 未被使用的线程会被终止并从缓存中移除)
      (不限制最大线程数并且使用没有任何容量的 SynchronousQueue 作为队列,容易开启太多线程导致 OOM)
      new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
    • ExecutorService newFixedThreadPool(int nThreads):创建一个可重用的、具有固定核心线程数的线程池
      (使用无界队列,队列堆积太多数据导致 OOM)
      new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
    • ExecutorService newSingleThreadExecutor():创建一个只有单线程的线程池
      new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
    • ScheduledExecutorService newScheduledThreadPool(int corePoolSize):创建一个线程池,可在指定延迟后执行或定期执行线程任务
      new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue())

SynchronousQueue 不储存元素的阻塞队列;LinkedBlockingQueue 的默认容量为 Integer.MAX_VALUE

# Executor 接口

  • 执行已提交的 Runnable 任务的对象
  • void execute(Runnable command):在未来某个时间执行给定的命令(该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 的实现决定

# ExecutorService 线程池接口

  • Executor 的子接口,代表尽快执行线程的线程池(只要线程池中有空闲线程,就立即执行线程任务),方法
  • void execute(Runnable command):在未来某个时间执行给定的命令
  • Future<?> submit(Runnable task):将一个 Runnable 对象提交给指定的线程池,线程池将在有空闲线程时执行 Runnable 对象代表的任务,其中 Future 对象代表 Runnable 任务的返回值——但 run () 方法没有返回值,所以 Future 对象将在 run() 方法执行结束后返回 null,但可以调用 Future 的 isDone()isCancelled() 方法来获得 Runnable 对象的执行状态
  • Future<T> submit(Runnable task, T result):将一个 Runnable 对象提交给指定的线程池,线程池将在有空闲线程时执行 Runnable 对象代表的任务,其中 result 显式指定线程执行结束后的返回值,所以 Future 对象将在 run() 方法执行结束后返回 result
  • Future<T> submit(Callable<T> task):将一个 Callable 对象提交给指定的线程池,线程池将在有空闲线程时执行 Callable 对象代表的任务,其中 Future 代表 Callable 对象里 call() 方法的返回值(调用 AbstractExecutorService#submit 方法,将 Callable 对象包装成 FutureTask 对象,再调用 ThreadPoolExecutor#execute 方法 → ThreadPoolExecutor.Worker#run)
  • List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks):执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表
  • T invokeAny(Collection<? extends Callable<T>> tasks):执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果
  • void shutdown():启动线程池的关闭序列,调用该方法后的线程池不再接收新任务,但会将以前所有已提交任务执行完成,当线程池中的所有任务都执行完成后,池中的所有线程都会死亡
  • List<Runnable> shutdownNow():试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表
  • boolean awaitTermination(long timeout, TimeUnit unit)阻塞,直到所有任务完成执行

如果任务通过 execute 提交,出现异常会导致线程退出

ThreadPoolExecutor#runWorker,此时如果没有手动捕获异常,则默认交由 ThreadGroup 进行处理(向标准错误输出打印出现异常的线程名称和异常信息)

如果任务通过 submit 提交,应该通过拿到的 Future 调用其 get 方法来获得任务运行结果和可能出现的异常,否则异常可能就被生吞了

原因:FutureTask#run,在执行任务出现异常之后,异常存到了一个 outcome 字段中,只有在调用 get 方法获取 FutureTask 结果的时候,才会以 ExecutionException 的形式重新抛出异常

# ScheduledExecutorService 接口

  • ExecutorService 的子接口,代表可在指定延迟后或周期性地执行线程任务的线程池,方法
    • ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):指定 callable 任务将在 delay 延迟后执行
    • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):指定 command 任务将在 delay 延迟后执行
    • ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):指定 command 任务将在 delay 延迟后执行,而且以设定频率重复执行(在 initialDelay 后开始执行,依次在 initialDelay+period、initialDelay+2*period… 处重复执行;如果当前任务耗时较多,超过定时周期 period,则当前任务结束后会立即执行)
    • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):创建并执行一个在给定初始延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都存在给定的延迟(如果任务在任一次执行时遇到异常,就会取消后续执行;否则,只能通过程序来显式取消或终止该任务)

# ThreadPoolExecutor

  • 线程池的实现类
  • 构造器
    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
  • 构造器中几个参数的含义:
    1. corePoolSize(线程池基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其它空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建(如果调用了线程池的 prestartAllCoreThreads 方法,线程池会提前创建并启动所有基本线程)
    2. maximumPoolSize(线程池最大线程数):线程池允许创建的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务
    3. keepAliveTime(线程保持活动的时间):当线程数大于 corePoolSize 时,线程空闲之后保持存活的时间(默认核心线程始终保存活动,即 allowCoreThreadTimeOut = false)
    4. TimeUnit(线程保持活动时间的单位):可以使用 TimeUnit 时间单位来设置
    5. BlockingQueue(任务队列):用于保存等待执行的任务的阻塞队列,可以选择以下几个:
      • ArrayBlockingQueue:基于数组的阻塞队列,按照 FIFO 原则进行排序
      • LinkedBlockingQueue:基于链表的阻塞队列,按照 FIFO 原则对元素进行排序。吞吐量通常要高于 ArrayBlockingQueue,Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor() 使用了这个队列
      • SynchronousQueue:一个不储存元素的阻塞队列,每一个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处于阻塞状态。吞吐量通常要高于 LinkedBlockingQueue,Executors.newCachedThreadPool() 使用了这个队列
      • PriorityBlockingQueue:一个具有优先级的无限阻塞队列
    6. ThreadFactory(线程工厂):用于设置创建线程的工厂
    7. RejectedExecutionHandler(饱和策略):这个本身是 Java 的一个接口,当队列和线程池都满了,需要一种策略处理新的任务,在这个类的最下部提供了四种内置的实现类:
      • AbortPolicy:直接抛出异常(默认策略)
      • CallerRunsPolicy:由提交任务的线程来执行
      • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前的任务
      • DiscardPolicy:不处理,直接丢弃
      • 自定义策略:实现 RejectedExecutionHandler 接口,自定义策略(如记录日志或持久化不能处理的任务)
  • 线程池默认的工作行为:
    • 不会初始化 corePoolSize 个线程,有任务来了才创建工作线程
    • 当核心线程满了之后不会立即扩容线程池,而是把任务堆积到工作队列中
    • 当工作队列满了后扩容线程池,一直到线程个数达到 maximumPoolSize 为止
    • 如果队列已满且达到了最大线程后还有任务进来,按照拒绝策略处理
    • 当线程数大于核心线程数时,线程等待 keepAliveTime 后还是没有任务需要处理的话,收缩线程到核心线程数
  • 提交一个新任务到线程池时,线程池的处理流程如下:
    1. 判断当前线程池的线程数是否小于设置的 corePoolSize,小于,则创建新线程来执行任务,否则
    2. 判断 workQueue 是否满了,没满,则将新提交的任务放到 workQueue 中,否则
    3. 判断当前线程池的线程数是否小于设置的 maximumPoolSize,小于,则创建新线程来执行任务,否则
    4. 交给饱和策略(RejectedExecutionHandler)来处理新任务
  • java.util.concurrent.ThreadPoolExecutor#runWorker
  • ThreadPoolExecutor 的**内部类工作线程 Worker 只要还存活**(有线程在跑),就会阻止 ThreadPoolExecutor 被 GC 回收

# ScheduledThreadPoolExecutor

  • ThreadPoolExecutor 的子类
  • 构造器
    ScheduledThreadPoolExecutor(int corePoolSize)
    ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
    ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
    ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)

# ForkJoinPool

  • 线程池的实现类,将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果

# 通过 ThreadPoolExecutor 的方式创建线程池

// 创建线程池
ExecutorService calcThreadPool = new ThreadPoolExecutor(8, 20, 5, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>(100), threadFactory, new ThreadPoolExecutor.AbortPolicy());

ExecutorService ioThreadPool = new ThreadPoolExecutor(100, 200, 5, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>(1000), threadFactory, new ThreadPoolExecutor.AbortPolicy());

// 向线程池提交任务
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
Future<String> future = pool.submit(() -> Thread.currentThread().getName());

// 关闭线程池
pool.shutdown(); // gracefully shutdown


ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, threadFactory);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 创建线程工厂的方式

线程需要配置的属性:name、daemon、priority、uncaughtExceptionHandler

  • 方式 1:使用 org.jodd.jodd-core 包

    // jodd.util.concurrent.ThreadFactoryBuilder
    ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("demo-pool-%d")
            .setUncaughtExceptionHandler((thread, throwable) -> log.error("ThreadPool {} got exception", thread, throwable))
            .get();
    
    1
    2
    3
    4
    5
  • 方式 2:使用 com.google.guava 包

    // com.google.common.util.concurrent.ThreadFactoryBuilder
    ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("demo-pool-%d")
            .setUncaughtExceptionHandler((thread, throwable)-> log.error("Uncaught exception in thread '{}':", thread.getName(), throwable))
            .build();
    
    1
    2
    3
    4
    5
  • 方式 3:使用 commons-lang3 包

    // org.apache.commons.lang3.concurrent.BasicThreadFactory
    BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
            .namingPattern("schedule-pool-%d")
            .uncaughtExceptionHandler((thread, throwable)-> log.error("Exception in thread \"{}\"", thread.getName(), throwable))
            .daemon(false)
            .build();
    
    1
    2
    3
    4
    5
    6
  • 方式 4:使用 Spring 提供的 ThreadPoolTaskExecutor 或 ThreadPoolTaskScheduler,两者都是 ThreadFactory 的实现类,类中包含了线程池作为成员变量(ThreadPoolExecutor 或 ScheduledExecutorService)

    static {
        Thread.setDefaultUncaughtExceptionHandler((thread, throwable)-> log.error("Thread {} got exception", thread.getName(), throwable));
    }
    
    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setCorePoolSize(5); // 默认为 1
        pool.setMaxPoolSize(50); // 默认为 Integer.MAX_VALUE
        pool.setQueueCapacity(1000); // 默认为 Integer.MAX_VALUE
        pool.setThreadNamePrefix("taskExecutor-");
        // 当线程池关闭时等待当前被调度的任务完成
        pool.setWaitForTasksToCompleteOnShutdown(true); // 默认为 false
        // pool.setThreadFactory(threadFactory); // 默认 this
        // pool.initialize(); // 初始化线程池,通过 @Bean 创建时,由于继承 InitializingBean,创建后会自动调用初始化方法
        return pool;
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
// 激进线程池:优先开启更多的线程,而把队列当成一个后备方案
// 类似 org.apache.tomcat.util.threads.ThreadPoolExecutor
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10) {
    @Override
    public boolean offer(Runnable e) {
        // 先返回 false,造成队列满的假象,让线程池优先扩容
        return false;
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        2, 5,
        5, TimeUnit.SECONDS,
        queue, new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get(), (r, executor) -> {
    try {
        // 等出现拒绝后再加入队列
        // 如果希望队列满了阻塞线程而不是抛出异常,那么可以注释掉下面三行代码,修改为 executor.getQueue().put(r);
        if (!executor.getQueue().offer(r, 0, TimeUnit.SECONDS)) {
            throw new RejectedExecutionException("ThreadPool queue full, failed to offer " + r.toString());
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 线程池的状态监控
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
    if (threadPool.isTerminated()) return;
    log.info("=========================");
    log.info("Pool Name: {}", threadPool.getName());
    log.info("Pool Size: {}", threadPool.getPoolSize());
    log.info("Active Threads: {}", threadPool.getActiveCount());
    log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
    log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
    log.info("=========================");
}, 0, 1, TimeUnit.SECONDS);
1
2
3
4
5
6
7
8
9
10
11

# CompletionService 接口

  • 向 CompletionService 提交一批任务,任务完成后将结果放入 LinkedBlockingQueue 阻塞队列中,可通过 take() 获取最先完成的任务结果
  • 实例类:ExecutorCompletionService

# JUC

# 原子类

  • java.util.concurrent.atomic
  • 以原子方式更新值
  • 适用于并发量小或写操作少:AtomicBoolean、AtomicInteger、AtomicLong 和 AtomicReference 的实例各自提供对相应类型单个变量的访问和更新
  • 适用于读少写多:LongAdder、DoubleAdder、LongAccumulator、DoubleAccumulator

# CAS 机制

  • CAS(Compare and Swap),即比较并替换,是一种实现并发算法时常用到的技术,Java 并发包中的很多类都使用了 CAS 技术
  • CAS 使用了 3 个基本操作数:内存地址 V,旧的预期值 A,要修改的新值 B
  • CAS 指令执行时,当且仅当内存地址 V 中的实际值与预期值 A 相等时,处理器将内存地址 V 的值修改为 B,否则处理器什么都不做
  • 整个比较并替换的操作是一个原子操作(利用 sun.misc.Unsafe 提供的原子性操作方法)
  • CAS 的缺点:
    1. 循环时间长开销很大,CPU 空转(重试直到成功)
    2. 只能保证一个共享变量的原子操作
    3. ABA 问题:在取值和比较替换之间存在时间差
      解决方法:通过控制变量值的版本(标记位)或时间戳来保证 CAS 的正确性,如 AtomicMarkableReference、AtomicStampedReference

# 并发容器类

  • ConcurrentHashMap 通常优于同步的 HashMap
  • ConcurrentSkipListMap 通常优于同步的 TreeMap
  • 仅当列表读多写少时,CopyOnWriteArrayList 优于同步的 ArrayList
  • ArrayBlockingQueue、SynchronousQueue

# ConcurrentHashMap 原理

  • jdk1.7 使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁(Segment 继承了 ReentrantLock),当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问
    • 共享变量都使用了 volatile 修饰
    • get 方法不需要加锁,除非读到的 value 是 null 时才会加锁重读
    • put 方法首先定位到 Segment,然后在 Segment 里进行插入操作
  • jdk1.8 以后采用了 CAS + synchronized 来保证并发安全性
    • 共享变量都使用了 volatile 修饰
    • 对于 put 操作:如果 key 对应的数组元素为 null,则通过 CAS 操作将其设置为当前值;如果 key 对应的数组元素(链表表头或树的根元素)不为 null,则对该元素使用 synchronized 关键字申请锁,然后进行操作
  • 不允许 null 作为 key 或 value

# CopyOnWriteArrayList 原理

  • 数组 array 都使用了 volatile 修饰
  • 当向容器添加、删除或修改元素的时候,先将当前容器进行 Copy(Arrays.copyOf),复制出一个新的容器,然后在新的容器里添加删除或修改元素,添加、删除或修改完元素之后,再将原容器的引用指向新的容器,整个过程加锁,保证了写的线程安全
  • 读操作不需要获得锁

# BlockingQueue 阻塞队列

  • 支持两个附加操作的 Queue:获取元素时等待队列变为非空,存储元素时等待队列变得可用
  • 已知实现类:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue
  • 对于不能立即满足但可能在将来某一时刻可以满足的操作,BlockingQueue 的方法的处理方式有四种:1. 抛出异常;2. 返回特殊值(null 或 false,具体取决于操作);3. 在操作可以成功前,无限期地阻塞当前线程;4. 在放弃前只在给定的最大时间限制内阻塞

BlockingQueue方法

# 安全失败(fail-safe)

  • 采用安全失败机制的集合容器,在遍历时不是直接在集合内容上访问的,而是先复制原有集合内容,在拷贝的集合上进行遍历
  • java.util.concurrent 包下的容器都是安全失败,可以在多线程下并发使用,并发修改

# CountDownLatch 计数器

  • 一个同步辅助类,在完成一组正在其它线程中执行的操作之前,让一个或多个线程一直等待(异步转同步)
  • 构造器 CountDownLatch(int count):构造一个用给定计数初始化的 CountDownLatch
  • 实例方法
    • void await():使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断
    • boolean await(long timeout, TimeUnit unit)
    • void countDown():递减锁存器的计数,如果计数到达零,则释放所有等待的线程
    • long getCount():返回当前计数

# CyclicBarrier 栅栏

  • 一个同步辅助类,让一组线程互相等待,直到所有参与者(线程)都到达某个公共屏障点(common barrier point),该 barrier 在释放等待线程后可以重用
  • 构造器
    • CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动
    • CyclicBarrier(int parties, Runnable barrierAction):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行
  • 实例方法
    • int await():在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待
    • int await(long timeout, TimeUnit unit)
    • int getNumberWaiting()
    • int getParties()

# Semaphore 信号量

  • 通常用于限制访问某些资源的线程数(限流)(控制并发度)
  • 构造器:Semaphore(int permits)Semaphore(int permits, boolean fair)
  • 实例方法:
    • void acquire():获取一个许可,如果没有就等待,直到获取许可为止
    • boolean tryAcquire(long timeout, TimeUnit unit):如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可
    • void release():释放一个许可

# AQS 框架

  • AQS(AbstractQueuedSynchronizer.java)是 Java 实现同步锁的基础框架
  • AQS 工作原理:用 volatile int state 变量来表示当前同步锁的状态,用 getState() 获取同步锁的状态,用 compareAndSetState(int expect, int update) 来对 state 的状态进行修改

# ReentrantLock 的实现

  • 获取锁

    • lock() 方法:首先尝试 CAS 获得锁 compareAndSetState(0, 1),成功则把占有锁的线程设置为当前线程,返回,失败则调用 acquire 方法
    • acquire 方法为 AQS 实现的模板方法,它调用 tryAcquire 方法尝试获得锁,成功则返回,不成功则进入等待队列直至获取成功
    • tryAcquire() 方法:查询当前 state 值,如果为 0 则说明当前锁还未被其他线程获取,则尝试 CAS 获得锁,成功则把占有锁的线程设置为当前线程,返回 true,失败返回 false;如果 state 不为 0 则说明该锁已经被其他线程获取,则检查获得锁的线程是否是当前线程以实现可重入特性,如果是,则更新 state 的值,并返回 true
  • 释放锁,unlock() 直接调用 tryRelease() 方法:

    • 首先检查当前释放锁的线程,如果不是已占有锁的线程则抛出异常,因为 ReentrantLock 是独占式锁,释放锁的线程一定是占有锁的线程
    • 将 state 值减 1,判断:如果 state 值等于 0 的,说明获取锁的所有方法都已经返回,则锁释放成功;如果 state 值不等于 0,说明只是部分递归的方法返回,部分递归方法还未返回,则释放失败,锁依然被占有
  • 在并发环境下,获取锁和释放锁需要以下三个部件的协调:

    1. 锁状态:state 为 0 的时候代表没有线程占有锁,可以去争抢这个锁,用 CAS 将 state 设为 1,如果 CAS 成功,说明抢到了锁;如果锁重入,state 进行 +1,释放锁就 -1,直到 state 又变为 0,代表释放锁,然后唤醒等待队列中的第一个线程,让其来占有锁
    2. 线程的阻塞和解除阻塞:AQS 中采用了 LockSupport.park(thread) 来挂起线程,用 unpark 来唤醒线程
    3. 等待队列:AQS 的等待队列基于是双向链表实现的,每个线程被包装成一个 Node 实例(属性:thread、waitStatus、pre、next),其中 head 节点不关联线程
  • 非公平锁 NonfairSync 和公平锁 Sync 的不同之处:

    1. 非公平锁在调用 lock() 后,首先就会调用 CAS 进行一次抢锁,如果这个时候锁没有被占用,就直接获取到锁并返回
    2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现这个时候锁被释放了(state == 0),非公平锁会直接 CAS 抢锁,而公平锁会通过 hasQueuedPredecessors() 判断等待队列是否有线程处于等待状态,如果有则将自己放在等待队列的队尾

# 锁的类型

# 乐观锁

  • 乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据:采取在写时先读出当前版本号,然后加锁操作(比较跟上一次的版本号,如果一样则更新),如果失败则要重复读-比较-写的操作
  • Java 中的乐观锁基本都是通过 CAS 操作实现的,CAS 是一种更新的原子操作,比较当前值跟传入值是否一样,一样则更新,否则失败

# 悲观锁

  • 悲观锁是就是悲观思想,即认为写多,遇到并发写的可能性高,每次去拿数据的时候都认为别人会修改,所以每次在读写数据的时候都会上锁,这样别人想读写这个数据就会 block 直到拿到锁
  • Java 中的悲观锁就是 synchronized,AQS 框架下的锁则是先尝试 CAS 乐观锁去获取锁,获取不到,才会转换为悲观锁,如 RetreenLock

# Java 中的锁

  • JVM 从 jdk1.6 开始,引入了偏向锁与轻量级锁,默认启用了自旋锁,它们都属于乐观锁

# 偏向锁

  • 如果在运行过程中,同步锁只有当前线程访问,不存在多线程争用的情况,则线程是不需要触发同步的,这种情况下,就会给线程加一个偏向锁
  • 如果在运行过程中,遇到了其它线程抢占锁,则持有偏向锁的线程会被挂起,JVM 会消除它身上的偏向锁,将锁恢复到标准的轻量级锁
  • 通过消除资源无竞争情况下的同步原语,进一步提高了程序的运行性能

# 轻量级锁

  • 由偏向所升级来的,偏向锁运行在一个线程进入同步块的情况下,当第二个线程加入锁争用的时候,偏向锁就会升级为轻量级锁

# 自旋锁

  • 为了不让当前线程进入阻塞状态,让当前线程循环去获取锁
  • 在经过若干次循环后,如果可以得到锁,那么就顺利进入临界区,如果还不能获得锁,才会将线程在操作系统层面挂起
  • 自旋锁的优缺点
    1. 使等待竞争锁的线程不需要做用户态和内核态之间的切换进入阻塞挂起状态,减少了不必要的上下文切换,执行速度快
    2. 需要消耗 CUP,让 CUP 在做无用功,所以需要设定一个自旋等待的最大时间(如设置为一个线程上下文切换的时间)

# 重量级锁 synchronized

  • 重量级锁把除了拥有锁的线程都阻塞,防止 CPU 空转
  • 非公平锁,可以重入
  • synchronized 的执行过程:
    1. 检测 Mark Word 中的偏向锁标识是否为 1,线程 ID 是不是当前线程 ID,如果是,表示当前线程处于偏向锁
    2. 如果不是,则使用 CAS 将 Mark Word 中线程 ID 设置为当前线程 ID,如果成功则表示当前线程获得偏向锁
    3. 如果失败,则说明发生竞争,撤销偏向锁,进而升级为轻量级锁
    4. 当前线程使用 CAS 将对象头的 Mark Word 替换为指向当前线程的栈帧中 Lock Record 的指针,如果成功,当前线程获得锁
    5. 如果失败,表示其它线程竞争锁,当前线程尝试使用自旋来获取锁
    6. 如果自旋成功则依然处于轻量级状态
    7. 如果自旋失败,则膨胀为重量级锁
  • 在所有的锁都启用的情况下,线程进入临界区时会先去获取偏向锁,如果已经存在偏向锁了,则会尝试获取轻量级锁,启用自旋锁,如果自旋也没有获取到锁,则使用重量级锁,没有获取到锁的线程阻塞挂起,直到持有锁的线程执行完同步块唤醒他们

# 锁优化

  1. 减少锁的时间:不需要同步执行的代码,能不放在同步快里面执行就不要放在同步快内,可以让锁尽快释放
  2. 减少锁的粒度:将物理上的一个锁,拆成逻辑上的多个锁,增加并行度,从而降低锁竞争,如 ConcurrentHashMap 在 jdk1.8 之前的版本
  3. 锁粗化:将多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁,避免频繁的加锁解锁操作
  4. 使用读写锁:读操作加读锁,可以并发读,写操作使用写锁,只能单线程写,如 ReentrantReadWriteLock
  5. 使用 volatile + CAS 操作
Updated at: 2024-03-16 15:34:12